package com.boot.base.kafka.config;

import com.boot.base.common.utils.ErrorUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.ObjectUtils;

import java.util.List;

@Slf4j
@Configuration
public class KafkaConfig {
    @Bean
    public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<Integer, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        // 批量拉取数据
        factory.setBatchListener(true);
        // 设置每个@kafkaListener的线程数
        factory.setConcurrency(3);
        // 设置手动提交AckMode
//        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }


    /**
     * 消费者异常处理
     * 比如 消费者处理超时，导致commit失败，就会调用handle方法
     *
     * @return
     */
    @Bean
    public ErrorHandler errorHandler() {
        return new ErrorHandler() {
            @Override
            public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
                // 这里data一直为null，不知为何
                log.error("kafka 消费者异常：{} \r\n ConsumerRecord = {}", ErrorUtils.getErrorMsg(thrownException), ObjectUtils.nullSafeToString(data));
            }

            @Override
            public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
                // 这个方法里可以拿到topic信息
                log.error("kafka 消费者异常：{} \r\n records = {} \r\n topics={} ", ErrorUtils.getErrorMsg(thrownException), ObjectUtils.nullSafeToString(records), consumer.subscription().toString());
            }
        };
    }

}
